# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hysop.constants import Implementation, DirectionLabels, TranspositionState
from hysop.fields.continuous_field import Field, ScalarField, TensorField
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.decorators import debug
from hysop.tools.numpywrappers import npw
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.core.graph.computational_node_frontend import ComputationalGraphNodeFrontend
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.operator.directional.directional import DirectionalOperatorGeneratorI
from hysop.testsenv import __HAS_OPENCL_BACKEND__
[docs]
class SpaceDerivative(ComputationalGraphNodeFrontend):
"""
Operator frontend to compute the derivative of a component
of a field in a given direction using a given method.
Currently two methods are supported:
*Finite differences: FiniteDifferencesSpaceDerivative
*Spectral method: SpectralSpaceDerivative
Those two classes can be passed to the more general MultiSpaceDerivatives
and Gradient operator generators via the 'cls' keyword argument during
__init__.
"""
[docs]
@classmethod
def spectral(cls, *args, **kwds):
"""SpaceDerivative.spectral(...) <=> SpectralSpaceDerivative(...)"""
return SpectralSpaceDerivative(*args, **kwds)
[docs]
@classmethod
def fd(cls, *args, **kwds):
"""SpaceDerivative.fd(...) <=> FiniteDifferencesSpaceDerivative(...)"""
return FiniteDifferencesSpaceDerivative(*args, **kwds)
@debug
def __new__(
cls,
F,
dF,
A=None,
derivative=None,
direction=None,
name=None,
pretty_name=None,
variables=None,
implementation=None,
base_kwds=None,
**kwds,
):
return super().__new__(
cls,
F=F,
dF=dF,
A=A,
direction=direction,
derivative=derivative,
variables=variables,
implementation=implementation,
base_kwds=base_kwds,
name=name,
pretty_name=pretty_name,
**kwds,
)
@debug
def __init__(
self,
F,
dF,
A=None,
derivative=None,
direction=None,
name=None,
pretty_name=None,
variables=None,
implementation=None,
base_kwds=None,
**kwds,
):
"""
SpaceDerivative operator frontend.
Compute the derivative of a field in a given direction
on a given backend, possibly scaled by another field/parameter/value.
dF = A * dF^m/d(x0^k0 x1^k1 ... xn^kn)
where k
where F is an input field
dF is an output field
A is a Field, a Parameter or a scalar.
(k0,...,kn) are positive integers
m=sum(ki)
n=F.dim-1
Parameters
----------
F: hysop.field.continuous_field.Field
Continuous field as input.
dF: hysop.field.continuous_field.Field
Continuous field to be written.
Some backend may allow inplace differentiation.
A: numerical value, ScalarParameter or Field, optional
Scaling for convenience.
derivative: int or tuple, optional
Which derivative to generate, defaults to (0,)*(dim-1)+(1,).
ie. first order derivative in X axis.
If integer is given, the derivative is taken in given direction.
direction: int, optional
Directions in which to take the derivative.
Defaults to None.
Should be None if derivative is a tuple.
name: str, optional
Name of this operator.
pretty_name: str, optional
Pretty name of this operator.
variables: dict, optional
Dictionary of fields as keys and topology descriptors as values.
implementation: Implementation, optional, defaults to None
target implementation, should be contained in available_implementations().
If None, implementation will be set to default_implementation().
base_kwds: dict, optional
Base class keyword arguments.
kwds: dict, optional
Extra parameters passed towards operator implementation.
Notes
-----
There is two way to build a derivative:
(1) derivative(int) + direction(int) gives:
=> derivative=(0,0,0,0,kd,0,0,0)
where the index of kd is direction
and kd=derivative
(2) derivative(tuple) + direction(None) gives:
=> derivative=(k0,...,kn)
"""
check_instance(F, ScalarField)
check_instance(dF, ScalarField)
check_instance(derivative, (tuple, int), allow_none=True)
check_instance(direction, int, allow_none=True)
check_instance(
variables,
dict,
keys=Field,
values=CartesianTopologyDescriptors,
allow_none=True,
)
check_instance(implementation, Implementation, allow_none=True)
check_instance(base_kwds, dict, keys=str, allow_none=True)
check_instance(name, str, allow_none=True)
check_instance(pretty_name, str, allow_none=True)
assert F in variables
assert dF in variables
super().__init__(
F=F,
dF=dF,
A=A,
direction=direction,
derivative=derivative,
variables=variables,
implementation=implementation,
base_kwds=base_kwds,
name=name,
pretty_name=pretty_name,
**kwds,
)
[docs]
@classmethod
def implementations(cls):
raise NotImplementedError
[docs]
@classmethod
def default_implementation(cls):
raise NotImplementedError
[docs]
class SpectralSpaceDerivative(SpaceDerivative):
"""
Operator frontend to compute the derivative of a component
of a field in a given direction using spectral methods.
"""
[docs]
@classmethod
def implementations(cls):
from hysop.backend.host.python.operator.derivative import (
PythonSpectralSpaceDerivative,
)
__implementations = {
Implementation.PYTHON: PythonSpectralSpaceDerivative,
}
if __HAS_OPENCL_BACKEND__:
from hysop.backend.device.opencl.operator.derivative import (
OpenClSpectralSpaceDerivative,
)
__implementations.update(
{
Implementation.OPENCL: OpenClSpectralSpaceDerivative,
}
)
return __implementations
[docs]
@classmethod
def default_implementation(cls):
return Implementation.PYTHON
[docs]
class FiniteDifferencesSpaceDerivative(SpaceDerivative):
r"""
Operator frontend to compute the derivative of a component
of a field in a given direction using finite differences.
/!\ FiniteDifferencesSpaceDerivative only supports directional derivatives /!\
"""
[docs]
@classmethod
def implementations(cls):
from hysop.backend.host.python.operator.derivative import (
PythonFiniteDifferencesSpaceDerivative,
)
__implementations = {
Implementation.PYTHON: PythonFiniteDifferencesSpaceDerivative,
}
if __HAS_OPENCL_BACKEND__:
from hysop.backend.device.opencl.operator.derivative import (
OpenClFiniteDifferencesSpaceDerivative,
)
__implementations.update(
{Implementation.OPENCL: OpenClFiniteDifferencesSpaceDerivative}
)
return __implementations
[docs]
@classmethod
def default_implementation(cls):
return Implementation.PYTHON
[docs]
class MultiSpaceDerivatives(
DirectionalOperatorGeneratorI, ComputationalGraphNodeGenerator
):
"""Generate multiple SpaceDerivative operators at once."""
@debug
def __new__(
mcls,
Fs,
dFs,
As=None,
cls=FiniteDifferencesSpaceDerivative,
directions=None,
derivatives=None,
extra_params=None,
base_kwds=None,
variables=None,
**op_kwds,
):
base_kwds = {} if (base_kwds is None) else base_kwds
return super().__new__(mcls, **base_kwds)
@debug
def __init__(
self,
Fs,
dFs,
As=None,
cls=FiniteDifferencesSpaceDerivative,
directions=None,
derivatives=None,
extra_params=None,
base_kwds=None,
variables=None,
**op_kwds,
):
"""
Create a operator generator that can handle multiple SpaceDerivative operators.
Operators are created in input parameter order.
Tuple parameters are zipped together with vectorized scalar parameters.
The number of operators is determined by len(Fs) and len(dFs).
If any parameter has not the expected size (ie. len(Fs) or 1), a ValueError is raised.
This operator can also be used as a directional operator generator.
Refer to hysop.operator.derivative.SpaceDerivative for all arguments.
"""
from hysop.operator.min_max import MinMaxDerivativeStatistics
if not issubclass(cls, (SpaceDerivative, MinMaxDerivativeStatistics)) or (
cls in (SpaceDerivative, MinMaxDerivativeStatistics)
):
msg = "cls should be a subclass of SpaceDerivative or MinMaxSpaceDerivativeStatistics, got {}."
msg += "\ncls MRO is:\n "
msg += "\n ".join(str(t) for t in cls.__mro__)
msg = msg.format(cls)
raise TypeError(msg)
base_kwds = first_not_None(base_kwds, {})
extra_params = first_not_None(extra_params, {})
super().__init__(**base_kwds)
Fs = to_tuple(Fs)
dFs = to_tuple(dFs)
check_instance(Fs, tuple, values=ScalarField)
check_instance(dFs, tuple, values=ScalarField, size=len(Fs))
nb_ops = len(Fs)
def fmt(x):
if isinstance(x, npw.ndarray):
x = x.ravel()
x = to_tuple(x)
if len(x) == 1:
x *= nb_ops
elif len(x) != nb_ops:
msg = "Length mismatch for {}, expected size {}."
msg = msg.format(x, nb_ops)
raise ValueError(msg)
return x
params = {
"F": fmt(Fs),
"dF": fmt(dFs),
"A": fmt(As),
"direction": fmt(directions),
"derivative": fmt(derivatives),
}
# Extract variables for each operator
_variables = ()
for f, df, a in zip(Fs, dFs, params["A"]):
tf = ComputationalGraphNode.get_topo_descriptor(variables, f)
tdf = ComputationalGraphNode.get_topo_descriptor(variables, df)
var = {f: tf, df: tdf}
if isinstance(a, ScalarField):
ta = ComputationalGraphNode.get_topo_descriptor(variables, a)
var[a] = ta
_variables += (var,)
params["variables"] = _variables
params.update({k: fmt(v) for (k, v) in extra_params.items()})
self._params = params
self._cls = cls
self._op_kwds = op_kwds
self.directions = params["direction"]
@debug
def _generate(self):
"""Generate all operators."""
params, cls = self._params, self._cls
operators = ()
for p in zip(*tuple(params.values())):
op_kwds = self._op_kwds.copy()
op_kwds.update(dict(zip(params.keys(), p)))
op = cls(**op_kwds)
operators += (op,)
return operators
[docs]
def generate_only_once_per_direction(self):
return True
[docs]
@debug
def generate_direction(self, i, dt_coeff):
should_generate = super().generate_direction(i=i, dt_coeff=dt_coeff)
if not should_generate:
return ()
directions = self.directions
ids = tuple(j for j in range(len(directions)) if directions[j] == i)
ops = tuple(self.nodes[j] for j in ids)
return ops
[docs]
@debug
def generate(self, **kwds):
if "splitting_dim" in kwds:
splitting_dim = kwds["splitting_dim"]
assert splitting_dim > max(self.directions)
else:
kwds["splitting_dim"] = max(self.directions)
ops = super().generate(**kwds)
assert ops is not None, self.__class__.__mro__
return ops